Curve 是云原生计算基金会 (CNCF) Sandbox 项目,是网易主导自研和开源的高性能、易运维、云原生的分布式存储系统。人类大脑的思维是线性思维,很难同时做两件事情,涉及到编程问题,我们基本上也是以顺序编程为主,总是处理完一件事情再处理下一件事情。然而,事实上我们的程序与之相反,通常需要同时完成好几件事情,对于服务器来说,会有并发请求同时到达服务器,此时如果我们依然按人类的顺序思维来完成工作,就会效率地下,不能发挥机器的性能。1、我们处理并发 I/O 请求的方式,主流的有两种:
使用操作系统的多线程做同步 I/O,I/O 阻塞时,线程被调度出去,硬件 I/O 完成了,线程又被调度进来执行,完成对请求的应答。这样的好处是简单、直观、容易理解和维护,代码依然是顺序编程,一次只完成一件事情。代价就是操作系统的线程比较重,I/O 阻塞时上下文切换在内核,切换比较慢。同时,每个操作系统的线程需要占用内核堆栈,这又消耗了系统资源。
常用的并发 I/O 编程方式使用 epoll/select 对 Socket I/O 做轮询,使用 aio 处理文件 I/O,让单个系统线程能够并发地递交多个 I/O 请求,程序一边在提交 I/O 请求、硬件同时在完成 I/O,这样就提高了单个线程的 I/O 并发,同时减少了上下文切换,整体上提高了系统的 I/O 性能。但是这样的编程方式缺点也是显而易见的,因为一个线程要同时处理好几个问题,所以编程不再是线性的,程序必须同时监控多个状态,并在中间状态进入和离开,稍有不慎,容易出错。程序始终还是需要人类去写的,所以又必须为人类思维所容易理解和接受,同时又必须要发挥机器的并发处理能力,于是人们发明了协程。- 继承了操作系统的多线程工作方式,依然使用人类线性思维,一次只做一件事。
- 协程通常不是强制抢占调度的,而是自愿做上下文切换的,这能充分发挥cpu的吞吐能力,避免了 无谓的基于时间片的上下文切换导致的cpu cache失效和流水线中断。
- 不利于计算型密集的程序;因为不是时间片调度的,如果协程不主动切换,就会饿死其他协程。如果协程序忙循环,也会导致 同样结果,忙循环是计算密集的一种特例。
- 隐含的阻塞;例如访问内存时的 page fault 导致陷入内核而不能执行其他协程。
- 阻塞在系统调用里;如果协程阻塞在操作系统调用里(无论是直接调用导致的还是调用其他代码,而这些代码阻塞在内核态时)会导致其他协程不能执行,影响了并发。例如协程调用 malloc,而 malloc 又阻塞在 mmap 上。
- 没有调试器支持;强大的 gdb 多线程调试也不能很好的使用了,协程对 gdb 不可见。代码的重入;不小心容易导致其他代码不知道当前是执行在协程的状态下,从而不能正确处理代码重入问题。例如协程c1当前运行在操作系统线程A上,使用了 pthread_mutex_lock,成功后它又需要等待其他资源而切换出去,此时另外一个协程序c2调度进来,继续在当前操作系统A上运行,它也使用 pthread_mutex_lock, 由于 pthread_mutex_t只适合在操作系统线程级别使用,这会导致 mutex 在本操作系统线程上的重入,结果要么死锁要么产生错误。
bthread 的本质是协程。bthread 组件有多个基于操作系统的 pthread 的工作线程,这些工作线程不断地获取待执行的 bthread 控制结构,并执行他们。执行的方式很简单,要运行某个 bthread 时,把它的寄存器恢复到当前 cpu(在x86_64 上,例如普通累加器 rax, rbx 等,以及堆栈寄存器例如 rsp,和指令地址寄存器 rip,状态寄存器 rflags 等),要切换出去的时候,保存 cpu 寄存器到 bthread 的控制结构,并选择下一个 bthread 的寄存器数据恢复出来就可以。bthread 组件因为使用了多个操作系统线程,操作系统线程能被内核均衡地调度到多个cpu上,而 bthread 组件又能够做 work stealing,当自己这个操作系统线程没有协程可以执行时,会从其他操作系统线程上取走协程,所以能在多个操作系统线程上均衡负载。因此,bthread 组件能发挥多 cpu 的能力。
当前 ChunkServer 使用了 brpc 和 braft,我们的代码大多数执行在这两个组件的上下文里。而这两个组件的执行机制,主要是使用了协程,例如当我们注册了 brpc 的服务时,服务的 method 被调用时,实际上是在一个 bthread 上下文里执行的,brpc 有多个 bthread 的工作线程(pthread),于是能并发地在多个cpu上执行 method 的调用。 braft 也是同样的道理,很多时候执行在 bthread 的上下文里。上面提到协程的缺点,主要是系统调用阻塞问题,但是长久以来,这个问题一直在 chunk server 中存在。例如ext4存储引擎,wal 和 chunk file 的读写,都使用了系统调用,这些系统调用会阻塞在磁盘 I/O 里,导致其他协程不能被执行,你可以说我们有不少 bthread 的调度线程,情况不会严重,但是别忘记,我们有很多 braft 状态机,情况会恶化,特别是大 I/O 时,阻塞时间会很长。更糟糕的是,brpc 内部代码依赖 bthread 做网络 I/O,如果 bthread 的调度线程都被阻塞并耗尽,会导致网络 I/O 卡滞。如果不能尽快将 socket buffer 里的数据取走,则会导致网络带宽不能很好地利用起来。1、使用bthread的同步原语, 只在叶子节点使用pthread同步原语
这样可以解决 bthread 的调度线程被阻塞问题。在软件栈的最底部才能使用 pthread 原语。例如有函数A,B,C他们的调用方式是A→B→C, 只有C可以使用 pthread 同步原语,例如 pthread_mutex_lock,pthread_cond_wait,pthread_rwlock_rdlock/wrlock。其他地方请使用bthread_mutex,bthread_cond, bthread_rwlock。如何利用 aio 和 poll 的结合,可以看 afd[1] 怎么使用的;然后使用 bthread_fd_wait(afd, EPOLLIN) 来等待 aio 的完成;- 如果不能用 aio,可以启动一个线程池,专门代理服务文件 I/O,我们的 pfsdaemon 就是这么干的。
对于读文件,可以使用 preadv2+ RWF_NOWAIT 尝试做内核文件缓冲区读,如果失败了,则把任务交给代理线程池从磁盘上读,因为从磁盘上读注定是要阻塞的,所以让代理去做,完成了再回来唤醒我们这个 bthread,问题不大。
对于于写文件,因为我们都是要求马上落盘的,这个系统调用铁定会阻塞,所以让代理去做就好了,完成了唤醒我们就行了。
// 定义操作类型,可以有很多
enum Op {OP_R, OP_W, OP_FLUSH}
// 定义io任务
struct io_task {
int fd;
Op op;
off_t off;
void *buf;
size_t size;
butil::atomic<int> *butex;
ssize_t result;
};
static moodycamel::BlockingConcurrentQueue<io_task> g_work_queue;
// 具体的io实现函数
ssize_t bthread_pread(int fd, void *buf, size_t sz, off_t off)
{
ssize_t res;
struct iovec iov;
if (bthread_self() != INVALID_BTHREAD) {
// 不处于bthread上下文
return ::pread(fd, buf, sz, off);
}
// 尝试内核缓冲区读
iov.io_base = buf;
io.io_len = sz;
res = preadv2(fd, &iov, 1, off, RWF_NOWAIT);
if (res == sz) {
return res;
}
// 让代理pthread读
io_task *t = new io_task;
// init task
t→fd = fd;
t→op = R;
t→off = off;
t→buf = buf;
t→size = sz;
t→butex = bthred:butex_create();
t→butex→store(0);
g_work_queue.enqueue(t);
// 等完成,bthread上下文切换
bthread::butex_wait(t→butex, 1, NULL);
bthread_butex_destroy(t→butex);
// 完成了,保存返回码
res = t→result;
delete t;
// 删除任务
return res;
}
ssize_t bthread_pwrite(int fd, void *buf, size_t sz, off_t off)
{
ssize_t res;
struct iovec iov;
if (bthread_self() != INVALID_BTHREAD) {
// 不处于bthread上下文
return ::pwrite(fd, buf, sz, off);
}
// 让代理pthread写
io_task *t = new io_task;
// init task
t→fd = fd;
t→op = OP_W;
t→off = off;
t→buf = buf;
t→size = sz;
t→butex = bthred:butex_create();
t→butex→store(0);
g_work_queue.enqueue(t);
// 等完成,bthread上下文切换
bthread::butex_wait(t→butex, 0, NULL);
bthread_butex_destroy(t→butex);
// 完成了,保存返回码
res = t→result;
delete t;
// 删除任务
return res;
}
经过以上修改,可以避免 bthread 阻塞在内核导致 bthread 的调度线程耗尽,而且我们不需要许多 bthread 的调度线程。理想状态下,几个就够了,多出来的 cpu 留给 spdk 和 rdma 的忙轮询线程。
系统的 I/O 并发度可以提高许多,大的 I/O 也不容易让系统的变慢,特别是网络的 I/O 可以及时被处理起来,所以 braft 的心跳可以及时得到维护。bthread 的好处将真正地发挥出来。
点击「阅读原文」可以查看当前实现进度,欢迎感兴趣的社区同学一起参与~
http://www.xmailserver.org/eventfd-aio-test.c关于 Curve
Curve 是一款高性能、易运维、云原生的开源分布式存储系统。可应用于主流的云原生基础设施平台:对接 OpenStack 平台为云主机提供高性能块存储服务;对接 Kubernetes 为其提供 RWO、RWX 等类型的持久化存储卷;对接 PolarFS 作为云原生数据库的高性能存储底座,完美支持云原生数据库的存算分离架构。Curve 亦可作为云存储中间件使用 S3 兼容的对象存储作为数据存储引擎,为公有云用户提供高性价比的共享文件存储。
- GitHub:https://github.com/opencurve/curve- 官网:https://opencurve.io/- 用户论坛:https://ask.opencurve.io/- 微信群:搜索群助手微信号 OpenCurve_bot